- Published on
Design Discuss and Re-invent a go routine Container
- Authors
- Name
- Mohan Liu
Abstract
In the preceding discussion, we explored the complexities associated with managing go routines. This time, we aim to address these challenges. A fundamental aspect of software engineering is the concept of abstraction. Within our scope, our goal is to devise a go routine manager that seamlessly handles the intricacies, allowing developers to concentrate on the core tasks at hand.
Languages like Java provide an executorService or a thread pool to manage concurrency. However, before we consider adopting a third-party pool service from GitHub, it's critical to analyze the underlying principles of a thread pool. We'll examine everything from the compromises involved in designing its API to the nitty-gritty of its actual implementation, ensuring the conversation remains both technical in nature and enlightening for educational purposes.
Objectives
1. Task Flexibility
the worker pool should be capable of handling a diverse array of computational patterns, ranging from complex tree/graph traversal algorithms to executing map-reduce operations efficiently.
2. Controlled Concurrency
meticulously managing the lifecycle of goroutines to prevent race conditions, deadlocks, and the exhaustion of system resources. This involves orchestrating goroutines in a manner that optimizes performance while adhering to a pre-defined concurrency model.
3. Hierarchical Task Delegation
allowing a goroutine to responsibly spawn sub-tasks, all the while maintaining strict adherence to the overarching concurrency constraints set forth by our worker pool design
4. Additional thoughts
Monitoring and Logging.
Consider if and how the system provides insight into its performance and operation. Does it offer logging, real-time monitoring, or analytics?Integration with Existing Systems.
How easily can this worker pool be integrated with existing systems? Does it provide interfaces or hooks for common frameworks or libraries?Customization and Extensibility.
Is the worker pool customizable? Can users extend its functionality, add plugins, or modify its behavior?Security Considerations.
Are there any security features or compliance with security standards?Performance Metrics.
Are there any performance benchmarks or goals the worker pool aims to meet?
5. Goroutine Management Encapsulation and Orchestration
Goroutine Number Specification.
Users can specify the exact number of goroutines, allowing for a customized balance between concurrency and resource use.Dynamic Resource Scaling.
The pool can dynamically add or reduce goroutines in rea- time, adapting to the current load without service interruption, features like perforanace metrics,and backpressue managment would be good to have.Context Integration.
Utilization of context as a cancellation and timeout signal ensures that goroutines can be stopped gracefully, aiding in the control ofthe resource lifecycle.Resource Exhaustion Mitigation.
The design anticipates and mitigates resource depletion, with mechanisms to handle spikes in demand and ensure system stability.Graceful Shutdown.
On termination, the worker pool guarantees the proper reclamation of resources, with each goroutine completing its current task to prevent data loss or corruption.Task Result and Error Handling.
Tasks are designed to return results or errors in a consistent manner, allowing for straightforward integration with the rest of the application.OS Signal Handling.
The worker pool listens for OS signals, ensuring that it reacts appropriately to external commands like graceful shutdown or emergency termination.
API Design and trade-off discussion
Before delving into the complexities of managing routines, it's important to consider the various components that will be integral to our design, as well as their interactions.
Overview
The enhanced structure of our Go routine container encompasses several intricately connected components that work in unison to streamline concurrent task management
GoRoutineContainer: Serves as the primary orchestrator within our system. It utilizes a TaskQueue to schedule the execution of tasks and a ResultPipeline to handle the aggregation of results. Its key responsibilities include adding tasks to the queue and dispatching them to available Workers.
TaskQueue: Acts as a holding area for tasks awaiting processing. It is capable of enqueuing tasks and checking if the queue is empty, ensuring a steady supply of work for the Workers.
ResultPipeline: A dedicated channel for accumulating the results after task executions. It provides the functionality to add new results, facilitating the collection and subsequent processing of output data.
Task: The fundamental unit of work within our system. Each task encompasses an Execute method, which, upon invocation, yields a Result. This design encapsulates the work logic and outcome within a single entity.
Worker: These are the workhorses of the system, each linked to a Task. Workers are responsible for processing tasks—executing the contained logic—and collecting results, which are then reported back to the ResultPipeline.
Result: Represents the output of a Task. It includes a correlation ID to trace the result back to its originating task, ensuring that each piece of work can be accurately tracked and correlated with its outcome.
The flow within the system is as follows
Task Allocation: The GoRoutineContainer places tasks into the TaskQueue.
Task Processing: Workers retrieve tasks from the TaskQueue, execute them, and generate results.
Result Aggregation: Workers submit the produced Results to the ResultPipeline.
Result Utilization: The ResultPipeline holds the results, which can then be used for further processing, analysis, or reporting.
Entities
1. Task
Flexibility considerations
Variadic Arguments
Results and Errors Handling.
To handle multiple return values and errors, you could define a Result type that includes a slice of interface for results, and an error type for error handling. After the task is run, the results and any errors are stored in a Result object, which can then be queued in the Result Queue.Runnable/Executable Interface.
Taking inspiration from Java's Runnable interface, which has a single run() method, ensures simplicity. In Go, for example, this could be an interface with a Run() method. This method should encapsulate the task's logic and be able to be executed by a worker without additional context.Self-Contained Logic.
Generics (Type Parameters).
If the language supports generics (like Java), they can be used to create a more type-safe way of passing arguments and receiving results, while still allowing for flexibility.Context Passing.
For tasks that might need cancellation or timeout, a context could be passed to the Run() method. This allows the caller to control task execution externally, such as cancelling a task that is taking too long to complete.Callbacks or Events
Implementation Considerations
Option 1 Object-Oriented Approach (Interface with A Run Method)
type Task interface { Perform() error } type SomeTask struct { SomeData string } func (st *SomeTask) Perform() error { //... return nil } type TaskExecutor struct{} func (te *TaskExecutor) Execute(t Task) error { return t.Perform() }
Option 2 Functional Approach (Closure)
type TaskFunc func() error func someTask(someVariable string) TaskFunc { return func() error { // Read the file // ... return nil } } func ExecuteTask(task TaskFunc) error { return task() }
Consideration | Object-Oriented (OOP) | Functional (FP with closures) |
---|---|---|
Extendability | Utilizes classes and interfaces to seamlessly facilitate the expansion of code. Ideal for anticipated future development. | -- |
Development Complexity | Offers a structured framework that simplifies developing large or intricate systems through clear separation of concerns. | -- |
Expression | -- | Employs concise functions leading to elegant, expressive solutions for straightforward tasks. |
Cognitive Load | -- | May introduce a higher cognitive burden due to complex scopes and challenges in modular design as the application scales. |
Task Composition | Enables the creation of new instances by passing the result of one task to another, fostering maintainability. | Task functions are created from the results of other tasks, which can lead to less readable nested functions and increased mental mapping in multi-step sequences. |
2. Workers
How to control the number of go routines
Option1 using Semaphore token
var sem = make(chan struct{}, maxGoroutines) for task := range tasks { sem <- struct{}{} // Acquire a token go func(task Task) { defer func() { <-sem }() // Release the token // ... do the task }(task) }
Option2 Fixed number of go routines, apply the pub-sub pattern
tasks := make(chan Task, numberOfTasks) for i := 0; i < maxGoroutines; i++ { go worker(tasks) } // worker is a function that processes tasks from the channel func worker(tasks <-chan Task) { for task := range tasks { // ... do the task } }
Feature | Semaphore Approach | Worker Pool Approach |
---|---|---|
Scaling | Changing the concurrency level is non-trivial | Relatively easy to add more workers |
Memory Usage | Potentially higher with many short-lived goroutines | Generally lower and more predictable |
Overhead | Goroutine creation/destruction overhead for each task | No overhead from spinning up/down goroutines for each task |
Complexity | Simple to implement, harder to adjust dynamically | More complex to implement, easier to adjust dynamically |
Task Execution Order | No inherent prioritization, FIFO if channel is used | No inherent prioritization, FIFO if channel is used |
Dynamic Adjustability | Not straightforward to change channel capacity at runtime | Can simply spin up new workers at runtime if needed |
in short, even though worker pool approach sounds a bit more complex, it provides in general good extendibility. So, Worker Pool Approach is chosen.
3. Result
When designing a result object within the context of a worker pool, it's crucial to consider how the container will manage and utilize the outcomes of the tasks it processes. The design of the result object can be significantly influenced by the nature of the tasks and the importance of their individual results.
For heterogeneous tasks, where each task's result is of distinct importance, the result object may take on characteristics similar to a promise-like construct found in many asynchronous programming environments. This type of result object would typically allow for encapsulation of each task's outcome, enabling the following features:
Asynchronous Resolution: The result object could provide a mechanism to asynchronously retrieve the outcome of a task once it's available, much like how a promise resolves with a value.
Error Handling: It could include methods for catching and handling errors that may occur during task execution.
Chaining: It might also support chaining operations, where a subsequent task could be queued or executed depending on the result of the current one.
On the other hand, for bulk operations, such as Conway's Game of Life or tasks amenable to a map-reduce paradigm, individual task results may be less significant compared to the collective progress or final aggregated result. In such scenarios, the design of the result object might prioritize different aspects:
Progress Tracking: The result object could focus on tracking the cumulative progress of all tasks, possibly through counters or progress events.
Aggregation: It might provide methods to combine or reduce results from individual tasks into a coherent whole.
Notification: The result object could be designed to notify the container when all tasks have reached a certain milestone or when the entire operation is complete.
Overall, the design of the result object should align with the broader goals and concerns of the worker pool's container, ensuring that it provides the necessary functionality to manage task outcomes effectively, whether they're handled individually or in aggregate.
Considerations
Mechanism | Heterogeneous Tasks Suitability | Bulk Operations Suitability |
---|---|---|
Callback | Suitable for individual notifications, but can become complex with diverse outcomes. | Less suitable due to challenges in result aggregation and callback management. |
Channel | Effective for task result communication, especially when results are significant and diverse. | Ideal for concurrent result processing and aggregation, aligns well with Go's concurrency model. |
Future/Promise | Highly suitable for encapsulating asynchronous results, with built-in error handling and result chaining. | Possible with additional logic for result collection and aggregation; Mono aligns well for single eventual outcomes. |
Shared Variable | Less recommended due to synchronization complexity for diverse results. | Effective when used with synchronization mechanisms for result aggregation. |
Mono | Well-suited for tasks requiring a single or no result, with reactive handling of results and errors. | Can represent the final aggregated result, but might require a collective operation like Mono.zip or Mono.when for multiple Monos. |
Flux | Less common as it's geared towards data streams rather than discrete task results. | Particularly well-suited for managing data streams and allows for sophisticated operations like filtering, reduction, and transformation in reactive programming. |
Channel is chosen because it is effective in handling both heterogeneous tasks and Bulk operations.
type TaskStatus string
const (
TaskSuccess TaskStatus = "SUCCESS"
TaskFailed TaskStatus = "FAILED"
TaskInProgress TaskStatus = "IN_PROGRESS"
)
type TaskResult struct {
TaskID string // Unique identifier for the task
Result interface{} // The actual result of the task, can be any type
Status TaskStatus // Optional for Promise, The status of the task
Error error // Error information, if applicable
CreatedAt time.Time // Timestamp when the task was created
CompletedAt time.Time // Timestamp when the task was completed
}
var chan TaskResult
4. Tasks Queue
Key considerations:
Thread Safety / Concurrency Safety.
- This is about ensuring that the queue can handle concurrent access without data races or corruption.
Scalability:
- Queue Length: The length of the queue must be managed. An unbounded queue can lead to high memory usage and potential system instability. Conversely, a bounded queue must have a strategy for when it's full (e.g., blocking, dropping tasks, or pushing back on the producer).
Task Priority: Good to have, but the complexity grows.
- Implementation: using multiple queues/priority queue for different priorities is a common approach. Workers would check the higher-priority queue first. The problem is low-priority tasks could starve due to constant inflow of higher-priority tasks. Strategies like aging (increasing the priority of tasks over time) can mitigate this.
Monitoring and Logging:
Queue Metrics: Tracking metrics such as queue size over time, task wait times, and worker idle times can help in understanding system performance and bottlenecks.
Logging: Detailed logs can aid in debugging issues and understanding system behavior. However, logging should not significantly impact performance.
Health Checks: Regular health checks can monitor the status of the queue and workers to ensure they are functioning correctly and efficiently.
Alerting: Implementing an alert system that triggers notifications when certain thresholds are breached (e.g., queue size exceeds a limit) can help prevent system overloads.
Load Balancing:
Task Distribution: You might consider strategies for distributing tasks to workers in a way that balances the load, especially if tasks have varying complexity.
Worker Performance: Keeping track of worker performance metrics could allow for smarter load balancing, sending more tasks to faster workers.
Graceful Degradation:
- Failure Handling: Decide how the system should degrade in the face of individual task failures. Should the task be retried, moved to a dead letter queue, or should there be a rollback mechanism?
Implementation Proposals
Mutex-Protected Queue
type Task struct { // Task fields and methods } type MutexQueue struct { tasks []Task lock sync.Mutex } func (q *MutexQueue) Enqueue(task Task) { q.lock.Lock() defer q.lock.Unlock() q.tasks = append(q.tasks, task) } func (q *MutexQueue) Dequeue() *Task { ... }
Atomic Operations
type Task struct { // Task fields and methods } type node struct { value Task next *node } type AtomicQueue struct { head *node tail *node } func NewAtomicQueue() *AtomicQueue { dummy := &node{} // Dummy node return &AtomicQueue{ head: dummy, tail: dummy, } } func (q *AtomicQueue) Enqueue(task Task) { newNode := &node{value: task} for { tail := loadNode(&q.tail) next := loadNode(&tail.next) if tail == loadNode(&q.tail) { if next == nil { if casNode(&tail.next, nil, newNode) { casNode(&q.tail, tail, newNode) return } } else { casNode(&q.tail, tail, next) } } } } func loadNode(ptr **node) *node { return (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(ptr)))) } func casNode(ptr **node, old, new *node) bool { return atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(ptr)), unsafe.Pointer(old), unsafe.Pointer(new)) }
Lock-Free data structure (not covered as out of the scope )
Concurrent Collections
class Task { } private ConcurrentLinkedQueue<Task> queue = new ConcurrentLinkedQueue<>(); public void addTask(Task task) { queue.offer(task); } public void processTasks() { Task task; while ((task = queue.poll()) != null) { task.execute(); } }
Channel-Based Queues
type Task struct { // Task fields and methods. } type ChannelQueue struct { taskChan chan Task } func NewChannelQueue(bufferSize int) *ChannelQueue { return &ChannelQueue{ taskChan: make(chan Task, bufferSize), } } func (q *ChannelQueue) Enqueue(task Task) { q.taskChan <- task // Blocks when the buffer is full } func (q *ChannelQueue) Dequeue() (Task, bool) { task, ok := <-q.taskChan // Blocks when the buffer is empty return task, ok } func (q *ChannelQueue) Close() { close(q.taskChan) }
Comparisons
Implementation Proposal | Thread Safety: Pros and Cons | Monitoring and Logging: Ease and Effectiveness | Scalability: Queue Length and Full Queue Handling | Extensibility: Priority Management |
---|---|---|---|---|
Mutex-Protected Queue | Pros: Simple to implement; easy to understand. Cons: Mutex contention can be a bottleneck; locking/unlocking overhead. | Ease: Relatively easy to add logging around lock operations. Effectiveness: Effective for tracking queue access and contention issues. | Queue Length: Easy to manage with condition variables. Full Queue: Can implement blocking or dropping tasks when full. | Extensibility: Can extend to support priorities by using multiple queues or a priority queue data structure. |
Atomic Operations | Pros: Better under high contention; avoids locking overhead. Cons: Complex to implement; susceptible to [[ABA problem]] | Ease: Harder due to the low-level nature of operations. Effectiveness: Can be less intuitive to correlate atomic operations with higher-level actions. | Queue Length: More challenging to manage without locks. Full Queue: Requires additional logic to handle full queue scenarios, which can be complex. | Extensibility: More difficult to extend for priorities due to complexity. May require intricate algorithms to handle priority ordering atomically. |
Lock-Free Queues | Pros: No locks involved, can be faster for certain workloads. Cons: Very complex to implement; difficult to debug. | Ease: Difficult due to the complexity of the lock-free algorithms. Effectiveness: Offers detailed insights if implemented correctly, but can be very challenging. | Queue Length: Management can be very complex due to the lock-free design. Full Queue: Handling a full queue is difficult and often requires fallback to locking mechanisms. | Extensibility: Extending with priority management is highly challenging due to the complexity of ensuring lock-free consistency with priority ordering. |
Concurrent Collections | Pros: Provided by many standard libraries; high-level abstraction. Cons: Potentially less performant due to generality; may not fit all use cases. | Ease: Easier as many libraries come with built-in monitoring capabilities. Effectiveness: Generally effective and aligned with the abstraction level of the collection. | Queue Length: Often have built-in management. Full Queue: Typically have well-defined behavior for full queues, such as blocking or rejection policies. | Extensibility: Many concurrent collections already support priority management or can be easily extended by using priority queue implementations provided by the language or libraries. |
Channel-Based Queues | Pros: Native to some languages (e.g., Go channels); built-in safety. Cons: Can be language-specific; varying performance characteristics. | Ease: Moderate, depending on language support for introspection. Effectiveness: Can be very effective, especially if the language/runtime provides good tools for channel monitoring. | Queue Length: Can be defined at channel creation. Full Queue: Channels block by default when full, but can be designed for non-blocking behavior or dropping messages. | Extensibility: Not inherently designed for priority management. Implementing priority would require additional structures or a separate priority channel system. |
Conclusion and Code Snippet
After going over the details and looking at different options, I've put together a solid worker pool implementation for you.
In my next blog post, we'll dive into how to use this worker pool to tackle some common coding problems, including how to handle a map-reduce situation. Additionally, we will introduce enhanced visualization of the worker pool's progress using Prometheus and Grafana.
Keep an eye out – we're about to get into some really practical stuff that could make your coding life a whole lot easier.
package workerpool
import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// TaskFunc is the type of function that can be submitted to the worker pool.
// It returns a result and an error. You would replace interface{} with whatever
// result type your tasks are supposed to return.
type TaskFunc func() (interface{}, error)
// type TaskFuncWithId func(int64 id) (interface{}, error)
type TaskFuncWithId struct {
Task TaskFunc
TaskId int64
}
type SubmitResult struct {
TaskId int64
Result interface{}
Error error
}
type Logger interface {
Log(message string)
}
type defaultLogger struct{}
func (l *defaultLogger) Log(message string) {
fmt.Println(message)
}
type WorkerPoolConfig struct {
MaxWorkers int // Maximum number of worker goroutines
Timeout time.Duration // Maximum time to wait for task completion
TaskQueueSize int // Use with Caution
Logger Logger
}
type WorkerPool struct {
isStopped int32 // atomic flag
config WorkerPoolConfig
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
publishers chan TaskFuncWithId
workerStopChans []chan bool
taskId atomic.Int64
ResultChan chan SubmitResult
logger Logger
stats map[string]prometheus.Metric
server *http.Server
}
func NewWorkerPool(workerPoolConfig WorkerPoolConfig) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
pool := &WorkerPool{
config: workerPoolConfig,
ctx: ctx,
cancel: cancel,
publishers: make(chan TaskFuncWithId, workerPoolConfig.TaskQueueSize),
workerStopChans: make([]chan bool, workerPoolConfig.MaxWorkers),
taskId: atomic.Int64{},
ResultChan: make(chan SubmitResult, workerPoolConfig.TaskQueueSize),
}
if workerPoolConfig.Logger != nil {
pool.logger = workerPoolConfig.Logger
} else {
pool.logger = &defaultLogger{}
}
psRunningWorkers := prometheus.NewCounter(prometheus.CounterOpts{
Name: "worker_pool_running_workers",
Help: "The total number of running worker",
})
psTotalSubmittedTasks := prometheus.NewCounter(prometheus.CounterOpts{
Name: "worker_pool_total_submitted_tasks",
Help: "The total number of tasks submitted to the worker pool",
})
psTotalExecutedTasks := prometheus.NewCounter(prometheus.CounterOpts{
Name: "worker_pool_total_executed_tasks",
Help: "The total number of tasks executed by the worker pool",
})
psTasksQueueSize := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "worker_pool_tasks_queue_size",
Help: "The size of the tasks queue",
})
// Register it with the default registry
prometheus.MustRegister(psRunningWorkers)
prometheus.MustRegister(psTotalSubmittedTasks)
prometheus.MustRegister(psTotalExecutedTasks)
prometheus.MustRegister(psTasksQueueSize)
pool.stats = make(map[string]prometheus.Metric)
pool.stats["psTotalSubmittedTasks"] = psTotalSubmittedTasks
pool.stats["psTotalExecutedTasks"] = psTotalExecutedTasks
pool.stats["psRunningWorkers"] = psRunningWorkers
pool.stats["psTasksQueueSize"] = psTasksQueueSize
for i := 0; i < workerPoolConfig.MaxWorkers; i++ {
stopChan := make(chan bool)
pool.workerStopChans[i] = stopChan
pool.wg.Add(1)
psRunningWorkers.Inc()
go pool.worker(i+1, stopChan)
}
go pool.startPrometheus()
return pool
}
func (wp *WorkerPool) startPrometheus() {
// Create a ServeMux and register the Prometheus handler
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
// Create the server with the custom ServeMux
wp.server = &http.Server{
Addr: ":8080",
Handler: mux,
}
// Run the server in a goroutine so that it doesn't block
go func() {
if err := wp.server.ListenAndServe(); err != http.ErrServerClosed {
// Handle error
panic(err)
}
}()
// Set up a channel to listen for OS signals for graceful shutdown
// stop := make(chan os.Signal, 1)
// signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
// Block until a signal is received
<-wp.ctx.Done()
wp.logger.Log("prometheus server stopped")
// Create a context with a timeout for the shutdown
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Gracefully shutdown the server, waiting for any in-flight requests to complete
if err := wp.server.Shutdown(ctx); err != nil {
// Handle error
panic(err)
}
wp.logger.Log("prometheus server stopped")
}
// worker is a method on the WorkerPool that processes tasks from the taskQueue.
func (wp *WorkerPool) worker(id int, stopChan chan bool) {
defer wp.wg.Done()
// TODO: do I need this??
// defer wp.stats["psRunningWorkers"].Dec()
defer wp.logger.Log(fmt.Sprintf("worker %d stopped\n", id))
for {
select {
case <-wp.ctx.Done(): // Check if context was cancelled (pool is stopping)
return
case <-time.After(wp.config.Timeout):
wp.logger.Log(fmt.Sprintf("worker %d timed out\n", id))
return
case <-stopChan: // Check if this specific worker was told to stop
return
case task, ok := <-wp.publishers: // Wait for a task
if !ok {
// The publishers channel was closed, no more tasks will come
return
}
gauge, ok := wp.stats["psTasksQueueSize"].(prometheus.Gauge)
if ok {
gauge.Dec()
}
if task.Task != nil {
wp.logger.Log(fmt.Sprintf("worker %d is working on task %d\n", id, task.TaskId))
result, err := task.Task()
if err != nil {
wp.logger.Log(fmt.Sprintf("worker %d error on task %d: %v\n", id, task.TaskId, err))
}
counter, ok := wp.stats["psTotalExecutedTasks"].(prometheus.Counter)
if ok {
counter.Inc()
}
if result == nil {
continue
}
taskResult := SubmitResult{
TaskId: task.TaskId,
Result: result,
Error: err,
}
loop:
for {
select {
case wp.ResultChan <- taskResult:
// Task sent successfully
break loop
default:
// Channel is full, handle the case when the channel is full
wp.logger.Log(fmt.Sprintf("worker %d stuck on sending task %d result, resultChan is full, cannot send result\n", id, task.TaskId))
// TODO: instead of panic what to do??
panic("resultChan is full, cannot send result")
}
}
}
}
}
}
func (wp *WorkerPool) Submit(task TaskFunc) (int64, <-chan SubmitResult, error) {
// Create a buffered channel for the result.
if atomic.LoadInt32(&wp.isStopped) == 1 {
return 0, nil, errors.New("worker pool is not accepting new tasks")
}
taskId := wp.taskId.Add(1)
counter, ok := wp.stats["psTotalSubmittedTasks"].(prometheus.Counter)
if ok {
counter.Inc()
}
TaskFuncWithId := TaskFuncWithId{
Task: task,
TaskId: taskId,
}
loop:
for {
select {
case wp.publishers <- TaskFuncWithId:
// Task sent successfully
gauge, ok := wp.stats["psTasksQueueSize"].(prometheus.Gauge)
if ok {
gauge.Inc()
}
break loop
default:
// Channel is full, handle the case when the channel is full
wp.logger.Log("publishers Channel is full, cannot send task")
}
}
wp.logger.Log(fmt.Sprintf("worker pool submitted task %d\n", taskId))
return taskId, wp.ResultChan, nil
}
func (wp *WorkerPool) WaitAll() {
wp.wg.Wait()
wp.logger.Log("all tasks completed")
}
func (wp *WorkerPool) Stop() {
atomic.StoreInt32(&wp.isStopped, 1)
// First, stop all workers by cancelling the context.
wp.cancel()
// Wait for all workers to finish.
wp.wg.Wait()
for _, stopChan := range wp.workerStopChans {
close(stopChan)
}
// Close the publishers channel to signal no more tasks will be sent.
// This is safe only after we have ensured all workers have stopped.
close(wp.publishers)
close(wp.ResultChan)
//TODO: Drain the resultChan.
// Optionally, you can also drain the resultChan here if needed,
// and possibly close it if no more results will be processed.
// Be aware that closing a channel while it is still being written to
// by other goroutines will cause a panic.
}
//https://github.com/MohanL/workerpool